Damien Lucas
Software Engineer

dlucasd
Damien Lucas
Software Engineer

dlucasd

🧹 Grand ménage
💸 Coûts et complexité à maintenir
❌ Java 8 et Scala 2.12 plus supportés
☕️ Kafka Clients & Kafka Streams → Java 11
☕️ Broker, Connect, Tools, MM2 → Java 17
❌ Retrait des anciennes API client-broker
⚙️ Broker 4.0 ⇄ Clients ≥ 2.1, Clients 4.0 ⇄ Brokers ≥ 2.1
👨💻 Ops : activer la métrique DeprecatedRequestsPerSec sur les brokers (3.7+), si >0 alors impact
❌ Formats de message v0/v1 plus supportés en écriture
✉️ Le “nouveau” format v2 devient l’unique format côté écriture
❌ Sortir définitivement de Log4j 1.x
⚙️ Outil de migration log4j-transform-cli
💻 config-file convert -i log4j1 -o log4j2 <inputFile> <outputFile>
⏱️ linger.ms: 0 ms → 5 ms (batching ↑, latence +~5 ms)
⏱️ (log.)message.timestamp.after.max.ms: ∞ → 1 h (protège des timestamps “futurs”)
📝 (log.)segment.bytes (min): 14 B → 1 MB (évite micro-segments)
num.recovery.threads.per.data.dir: 1 → 2 (rétablissement plus rapide)


🗳️ Consensus distribué, élection de leader
🤝 Coordination du cluster (Broker en vie ? Qui est controller ? Pannes, découvertes …)
💾 Stockage des métadonnées (topics, partitions, ACL, …)
🏥 Avoir un cluster sain (pas de partition offline, réplication stable …)
⬆️ Monter en 3.9.x
🧪 Migration en environnement de tests
(et tester la procédure de rollback!)
📈 Monitorer les logs et JMX
Metrics local = new Metrics(new MetricConfig());
Sensor messageSensor = local.sensor("message-count-sensor");
LinkedHashMap<String, String> tags = new LinkedHashMap<>();
tags.put("component", "my-app");
MetricName metricName = new MetricName(
"total-messages-processed",
"application",
"Total messages processed",
tags
);
messageSensor.add(metricName, new CumulativeCount());
KafkaMetric metric = local.metrics().get(metricName);
consumer.registerMetricForSubscription(metric);
consumer.unregisterMetricFromSubscription(metric);PnDTnHnMn.nS
Exemples:
P30D → 30 jours
PT12H → 12 heures
P1DT30M → 1 jour et 30 minutes

🔁 Rebalance incrémental
🎯 Géré par le coordinator, facilite le débug
⚙️ Activé par défaut côté serveur, côté client : group.protocol=consumer
⚖️ Compromis : usage CPU côté serveur, empreinte mémoire plus élevée
💻 Outils d’admin : kafka-groups.sh, kafka-consumer-groups.sh
🧪 Early access en 4.1.0
🧩 Se base sur le Consumer Group Protocol
🔀 Même principe mais pour l’assignation des tâches Kafka Streams
en early access
Broker: Authorizer, CreateTopicPolicy, ClientQuotaCallback, ReplicaSelector
Producer: Serializer, Partitioner, ProducerInterceptor
Consumer: Deserializer, ConsumerInterceptor
Connect: Converter, Transform, Predicate, Connector, Task
public class MyInterceptor<K, V>
implements ProducerInterceptor<K, V>, Monitorable {
private Sensor sensor;
@Override
public void withPluginMetrics(PluginMetrics metrics) {
sensor = metrics.sensor("onSend");
MetricName rate = metrics.metricName("rate", "Average number of calls per second.", new LinkedHashMap<>());
MetricName total = metrics.metricName("total", "Total number of calls.", new LinkedHashMap<>());
sensor.add(rate, new Rate());
sensor.add(total, new CumulativeCount());
}
@Override
public ProducerRecord<K,V> onSend(ProducerRecord<K,V> record) {
sensor.record();
return record;
}
}Problème:
Producer: topic.a.b ✅
Consumer: topic.a.b → topic_a_b ❌
Solution:
4.x: Ajoute topic.a.b (nouveau) + conservation topic_a_b (déprécié)
5.0: Seulement topic.a.b 🎉
❗️ Problème: seul client_credentials supporté
Secret statique = Potentiel risque de sécurité
🔐 Solution: jwt-bearer (RFC 7523)
Authentification via JWT signé
Problème: 1 version par cluster
Difficulté à gérer les montées de versions, rollback etc …
🎛️ Nouveauté: Support de plusieurs versions de plugins sur un cluster
opt/
plugins/
foo-connector-1.8/
foo-connector-1.8.jar
foo-dependencies-1.0.jar
foo-connector-1.9/
foo-connector-1.9.jar
foo-dependencies-1.1.jar
bar-connector/
bar-connector-1.0.jarSpécifier la version souhaitée
"xxx.plugin.version": "[3.5,)"
Versions visibles de manière distincte dans l’API
Nouvelle manière de consommer un topic
👤 Message consommé par un seul consommateur
🔄 Traitement asynchrone
✅ Acquittement au message
🗑️ Suppression du message après traitement
🔧 API identique au consumer group
➗ Permet la consommation de messages d'une seule partition par plusieurs consommateurs
✅ Acquittement au message
share.acknowledgement.mode=implicit (default) : acquittement automatique lors du poll()
share.acknowledgement.mode=explicit
@️KafkaListener(
topics = "order-processing",
groupId = "order-processors",
containerFactory = "shareKafkaListenerContainerFactory"
)
public void processOrder(OrderEvent event,
ShareAcknowledgment acknowledgment) {
try {
if (!isValid(event)) {
log.info("Rejet du message, ne sera pas re-deliveré");
acknowledgment.reject();
return;
}
orderService.process(event);
acknowledgment.acknowledge();
} catch (TransientException e) {
log.info("Erreur temporaire, rejeu possible");
acknowledgment.release();
} catch (PermanentException e) {
log.info("Rejet du message, ne sera pas re-deliveré");
acknowledgment.reject();
}
}share.delivery.count.limit: 5
share.record.lock.duration.ms: 30000 (30 sec)
✉️ Nécessité d’acquittement au message avec rejeu/rejet
📈 Mise à l’échelle sans garantie d’ordre
🔁 Faciliter la migration de système type queue vers Kafka
🚀 Production ready pour la 4.2.0
💻 Outil d’admin : kafka-share-groups.sh
📨 Intégration de Dead Letter Queue
❓ Garantie d’ordre en réflexion
Version: Spring Kafka 4.0.0-RC1
🛠️ Passage total à KRaft, suppression ZooKeeper
🧪 Embedded Kafka pour les tests sur KRaft
🔄 Nouveau protocole rebalance (propriété Spring Boot)
👥 Support Kafka Queues
📊 Observabilité enrichie avec les métriques clients
🎯 Alignement Spring Framework 7: Jackson 3, Spring Retry natif
⚠️ RC, release pour fin d’année
Version: Quarkus 3.24+
Nouveau protocole rebalance ✅
KRaft ✅
Métriques clients ❌
Kafka Queues ❌
~3 Releases/An
Release mineure peut contenir des features majeures !
2.8.0 : KRaft intro
3.6.0 : Tiered Storage
Support : 3 dernières versions
📅 Sortie pour janvier/février 2026
📦 28 KIP figées
📊 10 KIP sur l’observabilité
📨 Dead letter queue native dans Kafka Streams
Feedback
Github
public class LoggingProcessorWrapper<KIn, VIn, KOut, VOut>
implements ProcessorWrapper<KIn, VIn, KOut, VOut> {
...
private static class LoggingProcessor<KIn, VIn, KOut, VOut>
implements Processor<KIn, VIn, KOut, VOut>, ProcessorContextAware {
...
@Override
public void init(ProcessorContext<KOut, VOut> context) {
delegate.init(context);
}
@Override
public void process(Record<KIn, VIn> record) {
Instant start = Instant.now();
logger.info("Entrée dans le processeur");
delegate.process(record);
var duration = Duration.between(start, Instant.now());
logger.info("Sortie du processeur : durée={}", duration.toMillis());
}
@Override
public void close() {
delegate.close();
}
}
}
________________________________
application.properties:
processor.wrapper.class=com.mirakl.LoggingProcessorWrapperAvant :
// Topic orders, key : {clientId, orderId}, value : {productId, quantity}
// Topic clients, key : {clientId}, value : {name, address}
// Étape intermédiaire : Ajouter orderId à la valeur
KTable<OrderKey, OrderWithClient> ordersWithClient = orders.mapValues(
(key, order) -> new OrderWithClient(key.getClientId(), order.getProductId(), order.getQuantity()),
);
KTable<OrderKey, EnrichedOrder> enriched = ordersWithClient.join(
clients,
// Extracteur de clé étrangère : uniquement la valeur
orderWithClient -> new ClientKey(orderWithClient.getClientId()),
(orderWithClient, client) -> EnrichedOrder.of(orderWithClient.getProductId(), orderWithClient.getQuantity(), client.getName(), client.getAddress()),
);Après :
KTable<OrderKey, EnrichedOrder> enriched = orders.join(
clients,
// Extracteur de clé étrangère : utilise clé (OrderKey) et valeur (Order)
(orderKey, order) -> new ClientKey(orderKey.getClientId()),
(order, client) -> EnrichedOrder.of(order.getProductId(), order.getQuantity(), client.getName(), client.getAddress()),
);Problème: flush() dans callback de send() → Deadlock sur ioThread
Timeout confusant, app bloquée, diagnostic difficile
Solution: KafkaException immédiate
Problème: Exceptions floues pour transactions (Retriable? Fatal? Abortable?)
Solution: 4 catégories + hiérarchie claire
Retriable (ex: TimeoutException) : Retry auto
RefreshRetriable (ex: UnknownTopic) : Refresh metadata + retry
Abortable : Abort transaction (ex: CommitFailed)
ApplicationRecoverable : Restart app/producer (ex: ProducerFenced)
InvalidConfig : Fix config (ex: AuthError)
$ bin/kafka-groups.sh --bootstrap-server localhost:9092 --list
GROUP TYPE PROTOCOL
old-consumer-group Classic consumer
new-consumer-group Consumer consumer
connect-cluster Classic connect
share-group Share share
schema-registry Classic sr
simple-consumer-group Classickafka-consumer-groups.sh
kafka-share-groups.sh